#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""A set of utilities to write pipelines for performance tests.

This module offers a way to create pipelines using synthetic sources and steps.
Exact shape of the pipeline and the behaviour of sources and steps can be
controlled through arguments. Please see function 'parse_args()' for more
details about the arguments.

Shape of the pipeline is primariy controlled through two arguments. Argument
'steps' can be used to define a list of steps as a JSON string. Argument
'barrier' describes how these steps are separated from each other. Argument
'barrier' can be use to build a pipeline as a a series of steps or a tree of
steps with a fanin or a fanout of size 2.

Other arguments describe what gets generated by synthetic sources that produce
data for the pipeline.
"""

from __future__ import absolute_import
from __future__ import division

import argparse
import json
import logging
import math
import time

import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.io import iobase
from apache_beam.io import range_trackers
from apache_beam.io import restriction_trackers
from apache_beam.io.restriction_trackers import OffsetRange
from apache_beam.io.restriction_trackers import OffsetRestrictionTracker
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.transforms.core import RestrictionProvider

try:
  import numpy as np
except ImportError:
  np = None


def parse_byte_size(s):
  suffixes = 'BKMGTP'
  if s[-1] in suffixes:
    return int(float(s[:-1]) * 1024 ** suffixes.index(s[-1]))

  return int(s)


def div_round_up(a, b):
  """Return ceil(a/b)."""
  return int(math.ceil(float(a) / b))


def rotate_key(element):
  """Returns a new key-value pair of the same size but with a different key."""
  (key, value) = element
  return key[-1:] + key[:-1], value


def initial_splitting_zipf(start_position, stop_position,
                           desired_num_bundles, distribution_parameter,
                           num_total_records=None):
  """Split the given range (defined by start_position, stop_position) into
     desired_num_bundles using zipf with the given distribution_parameter.
  """
  if not num_total_records:
    num_total_records = stop_position - start_position
  samples = np.random.zipf(distribution_parameter, desired_num_bundles)
  total = sum(samples)
  relative_bundle_sizes = [(float(sample) / total) for sample in samples]
  bundle_ranges = []
  start = start_position
  index = 0
  while start < stop_position:
    if index == desired_num_bundles - 1:
      bundle_ranges.append((start, stop_position))
      break
    stop = start + int(num_total_records * relative_bundle_sizes[index])
    bundle_ranges.append((start, stop))
    start = stop
    index += 1
  return bundle_ranges


class SyntheticStep(beam.DoFn):
  """A DoFn of which behavior can be controlled through prespecified parameters.
  """
  def __init__(self, per_element_delay_sec=0, per_bundle_delay_sec=0,
               output_records_per_input_record=1, output_filter_ratio=0):
    if per_element_delay_sec and per_element_delay_sec < 1e-3:
      raise ValueError('Per element sleep time must be at least 1e-3. '
                       'Received: %r', per_element_delay_sec)
    self._per_element_delay_sec = per_element_delay_sec
    self._per_bundle_delay_sec = per_bundle_delay_sec
    self._output_records_per_input_record = output_records_per_input_record
    self._output_filter_ratio = output_filter_ratio

  def start_bundle(self):
    self._start_time = time.time()

  def finish_bundle(self):
    # The target is for the enclosing stage to take as close to as possible
    # the given number of seconds, so we only sleep enough to make up for
    # overheads not incurred elsewhere.
    to_sleep = self._per_bundle_delay_sec - (time.time() - self._start_time)

    # Ignoring sub-millisecond sleep times.
    if to_sleep >= 1e-3:
      time.sleep(to_sleep)

  def process(self, element):
    if self._per_element_delay_sec >= 1e-3:
      time.sleep(self._per_element_delay_sec)
    filter_element = False
    if self._output_filter_ratio > 0:
      if np.random.random() < self._output_filter_ratio:
        filter_element = True

    if not filter_element:
      for _ in range(self._output_records_per_input_record):
        yield element


class NonLiquidShardingOffsetRangeTracker(OffsetRestrictionTracker):
  """An OffsetRangeTracker that doesn't allow splitting. """

  def try_split(self, split_offset):
    pass  # Don't split.

  def checkpoint(self):
    pass # Don't split.


class SyntheticSDFStepRestrictionProvider(RestrictionProvider):
  """A `RestrictionProvider` for SyntheticSDFStep.

  An initial_restriction and split that operate on num_records and ignores
  source description (element). Splits into initial_splitting_num_bundles.
  Returns size_estimate_override as restriction size, if set. Otherwise uses
  element size.

  If initial_splitting_uneven_chunks, produces uneven chunks.

  """

  def __init__(self, num_records, initial_splitting_num_bundles,
               initial_splitting_uneven_chunks, disable_liquid_sharding,
               size_estimate_override):
    self._num_records = num_records
    self._initial_splitting_num_bundles = initial_splitting_num_bundles
    self._initial_splitting_uneven_chunks = initial_splitting_uneven_chunks
    self._disable_liquid_sharding = disable_liquid_sharding
    self._size_estimate_override = size_estimate_override

  def initial_restriction(self, element):
    return (0, self._num_records)

  def create_tracker(self, restriction):
    if self._disable_liquid_sharding:
      return NonLiquidShardingOffsetRangeTracker(restriction[0],
                                                 restriction[1])
    else:
      return OffsetRestrictionTracker(restriction[0], restriction[1])

  def split(self, element, restriction):
    elems = restriction[1] - restriction[0]
    if (self._initial_splitting_uneven_chunks and
        self._initial_splitting_num_bundles > 1 and elems > 1):
      return initial_splitting_zipf(restriction[0], restriction[1],
                                    self._initial_splitting_num_bundles, 3.0)
    else:
      offsets_per_split = max(1, (elems // self._initial_splitting_num_bundles))
      result = list(
          OffsetRange(restriction[0], restriction[1]).split(
              offsets_per_split, offsets_per_split // 2))
      return [(x.start, x.stop) for x in result]

  def restriction_size(self, element, restriction):
    if self._size_estimate_override is not None:
      return self._size_estimate_override
    element_size = len(element) if isinstance(element, str) else 1
    return (restriction[1] - restriction[0]) * element_size


def get_synthetic_sdf_step(per_element_delay_sec=0,
                           per_bundle_delay_sec=0,
                           output_records_per_input_record=1,
                           output_filter_ratio=0,
                           initial_splitting_num_bundles=8,
                           initial_splitting_uneven_chunks=False,
                           disable_liquid_sharding=False,
                           size_estimate_override=None,):
  """A function which returns a SyntheticSDFStep with given parameters. """

  class SyntheticSDFStep(beam.DoFn):
    """A SplittableDoFn of which behavior can be controlled through prespecified
       parameters.
    """

    def __init__(self, per_element_delay_sec_arg, per_bundle_delay_sec_arg,
                 output_filter_ratio_arg, output_records_per_input_record_arg):
      if per_element_delay_sec_arg:
        per_element_delay_sec_arg = (
            per_element_delay_sec_arg // output_records_per_input_record_arg)
        if per_element_delay_sec_arg < 1e-3:
          raise ValueError(
              'Per element sleep time must be at least 1e-3 after being '
              'divided among output elements.')
      self._per_element_delay_sec = per_element_delay_sec_arg
      self._per_bundle_delay_sec = per_bundle_delay_sec_arg
      self._output_filter_ratio = output_filter_ratio_arg

    def start_bundle(self):
      self._start_time = time.time()

    def finish_bundle(self):
      # The target is for the enclosing stage to take as close to as possible
      # the given number of seconds, so we only sleep enough to make up for
      # overheads not incurred elsewhere.
      to_sleep = self._per_bundle_delay_sec - (
          time.time() - self._start_time)

      # Ignoring sub-millisecond sleep times.
      if to_sleep >= 1e-3:
        time.sleep(to_sleep)

    def process(self,
                element,
                restriction_tracker=beam.DoFn.RestrictionParam(
                    SyntheticSDFStepRestrictionProvider(
                        output_records_per_input_record,
                        initial_splitting_num_bundles,
                        initial_splitting_uneven_chunks,
                        disable_liquid_sharding,
                        size_estimate_override))):
      filter_element = False
      if self._output_filter_ratio > 0:
        if np.random.random() < self._output_filter_ratio:
          filter_element = True

      for k in range(*restriction_tracker.current_restriction()):
        if not restriction_tracker.try_claim(k):
          return

        if self._per_element_delay_sec:
          time.sleep(self._per_element_delay_sec)

        if not filter_element:
          yield element

  return SyntheticSDFStep(per_element_delay_sec, per_bundle_delay_sec,
                          output_filter_ratio, output_records_per_input_record)


class SyntheticSource(iobase.BoundedSource):
  """A custom source of a specified size.
  """

  def __init__(self, input_spec):
    """Initiates a synthetic source.

    Args:
      input_spec: Input specification of the source. See corresponding option in
                  function 'parse_args()' below for more details.
    Raises:
      ValueError: if input parameters are invalid.
    """

    def maybe_parse_byte_size(s):
      return parse_byte_size(s) if isinstance(s, str) else int(s)

    self._num_records = input_spec['numRecords']
    self._key_size = maybe_parse_byte_size(input_spec.get('keySizeBytes', 1))
    self._hot_key_fraction = input_spec.get('hotKeyFraction', 0)
    self._num_hot_keys = input_spec.get('numHotKeys', 0)

    self._value_size = maybe_parse_byte_size(
        input_spec.get('valueSizeBytes', 1))
    self._total_size = self.element_size * self._num_records
    self._initial_splitting = (
        input_spec['bundleSizeDistribution']['type']
        if 'bundleSizeDistribution' in input_spec else 'const')
    if self._initial_splitting != 'const' and self._initial_splitting != 'zipf':
      raise ValueError(
          'Only const and zipf distributions are supported for determining '
          'sizes of bundles produced by initial splitting. Received: %s',
          self._initial_splitting)
    self._initial_splitting_num_bundles = (
        input_spec['forceNumInitialBundles']
        if 'forceNumInitialBundles' in input_spec else 0)
    if self._initial_splitting == 'zipf':
      self._initial_splitting_distribution_parameter = (
          input_spec['bundleSizeDistribution']['param'])
      if self._initial_splitting_distribution_parameter < 1:
        raise ValueError(
            'Parameter for a Zipf distribution must be larger than 1. '
            'Received %r.', self._initial_splitting_distribution_parameter)
    else:
      self._initial_splitting_distribution_parameter = 0
    self._dynamic_splitting = (
        'none' if (
            'splitPointFrequencyRecords' in input_spec
            and input_spec['splitPointFrequencyRecords'] == 0)
        else 'perfect')
    if 'delayDistribution' in input_spec:
      if input_spec['delayDistribution']['type'] != 'const':
        raise ValueError('SyntheticSource currently only supports delay '
                         'distributions of type \'const\'. Received %s.',
                         input_spec['delayDistribution']['type'])
      self._sleep_per_input_record_sec = (
          float(input_spec['delayDistribution']['const']) / 1000)
      if (self._sleep_per_input_record_sec and
          self._sleep_per_input_record_sec < 1e-3):
        raise ValueError('Sleep time per input record must be at least 1e-3.'
                         ' Received: %r', self._sleep_per_input_record_sec)
    else:
      self._sleep_per_input_record_sec = 0

  @property
  def element_size(self):
    return self._key_size + self._value_size

  def estimate_size(self):
    return self._total_size

  def split(self, desired_bundle_size, start_position=0, stop_position=None):
    # Performs initial splitting of SyntheticSource.
    #
    # Exact sizes and distribution of initial splits generated here depends on
    # the input specification of the SyntheticSource.

    if stop_position is None:
      stop_position = self._num_records
    if self._initial_splitting == 'zipf':
      desired_num_bundles = self._initial_splitting_num_bundles or math.ceil(
          float(self.estimate_size()) / desired_bundle_size)
      bundle_ranges = initial_splitting_zipf(
          start_position, stop_position, desired_num_bundles,
          self._initial_splitting_distribution_parameter, self._num_records)
    else:
      if self._initial_splitting_num_bundles:
        bundle_size_in_elements = max(1, int(
            self._num_records /
            self._initial_splitting_num_bundles))
      else:
        bundle_size_in_elements = (max(
            div_round_up(desired_bundle_size, self.element_size),
            int(math.floor(math.sqrt(self._num_records)))))
      bundle_ranges = []
      for start in range(start_position, stop_position,
                         bundle_size_in_elements):
        stop = min(start + bundle_size_in_elements, stop_position)
        bundle_ranges.append((start, stop))

    for start, stop in bundle_ranges:
      yield iobase.SourceBundle(stop - start, self, start, stop)

  def get_range_tracker(self, start_position, stop_position):
    if start_position is None:
      start_position = 0
    if stop_position is None:
      stop_position = self._num_records
    tracker = range_trackers.OffsetRangeTracker(start_position, stop_position)
    if self._dynamic_splitting == 'none':
      tracker = range_trackers.UnsplittableRangeTracker(tracker)
    return tracker

  def _gen_kv_pair(self, index):
    r = np.random.RandomState(index)
    rand = r.random_sample()

    # Determines whether to generate hot key or not.
    if rand < self._hot_key_fraction:
      # Generate hot key.
      # An integer is randomly selected from the range [0, numHotKeys-1]
      # with equal probability.
      r_hot = np.random.RandomState(index % self._num_hot_keys)
      return r_hot.bytes(self._key_size), r.bytes(self._value_size)
    else:
      return r.bytes(self._key_size), r.bytes(self._value_size)

  def read(self, range_tracker):
    index = range_tracker.start_position()
    while range_tracker.try_claim(index):
      time.sleep(self._sleep_per_input_record_sec)
      yield self._gen_kv_pair(index)
      index += 1

  def default_output_coder(self):
    return beam.coders.TupleCoder(
        [beam.coders.BytesCoder(), beam.coders.BytesCoder()])


class SyntheticSDFSourceRestrictionProvider(RestrictionProvider):
  """A `RestrictionProvider` for SyntheticSDFAsSource.

  In initial_restriction(element) and split(element), element means source
  description.
  A typical element is like:

    {
      'key_size': 1,
      'value_size': 1,
      'initial_splitting_num_bundles': 8,
      'initial_splitting_desired_bundle_size': 2,
      'sleep_per_input_record_sec': 0,
      'initial_splitting' : 'const'

    }

  """

  def initial_restriction(self, element):
    return (0, element['num_records'])

  def create_tracker(self, restriction):
    return restriction_trackers.OffsetRestrictionTracker(
        restriction[0], restriction[1])

  def split(self, element, restriction):
    bundle_ranges = []
    start_position, stop_position = restriction
    element_size = element['key_size'] + element['value_size']
    estimate_size = element_size * element['num_records']
    if element['initial_splitting'] == 'zipf':
      desired_num_bundles = (
          element['initial_splitting_num_bundles'] or
          div_round_up(estimate_size,
                       element['initial_splitting_desired_bundle_size']))
      samples = np.random.zipf(
          element['initial_splitting_distribution_parameter'],
          desired_num_bundles)
      total = sum(samples)
      relative_bundle_sizes = [(float(sample) / total) for sample in samples]
      start = start_position
      index = 0
      while start < stop_position:
        if index == desired_num_bundles - 1:
          bundle_ranges.append((start, stop_position))
          break
        stop = start + int(
            element['num_records'] * relative_bundle_sizes[index])
        bundle_ranges.append((start, stop))
        start = stop
        index += 1
    else:
      if element['initial_splitting_num_bundles']:
        bundle_size_in_elements = max(1, int(
            element['num_records'] /
            element['initial_splitting_num_bundles']))
      else:
        bundle_size_in_elements = (max(
            div_round_up(
                element['initial_splitting_desired_bundle_size'], element_size),
            int(math.floor(math.sqrt(element['num_records'])))))
      for start in range(start_position, stop_position,
                         bundle_size_in_elements):
        stop = min(start + bundle_size_in_elements, stop_position)
        bundle_ranges.append((start, stop))
    return bundle_ranges

  def restriction_size(self, element, restriction):
    return ((element['key_size'] + element['value_size'])
            * (restriction[1] - restriction[0]))


class SyntheticSDFAsSource(beam.DoFn):
  """A SDF that generates records like a source.

  This SDF accepts a PCollection of record-based source description.
  A typical description is like:

    {
      'key_size': 1,
      'value_size': 1,
      'initial_splitting_num_bundles': 8,
      'initial_splitting_desired_bundle_size': 2,
      'sleep_per_input_record_sec': 0,
      'initial_splitting' : 'const'

    }

  A simple pipeline taking this SDF as a source is like:
    p
    | beam.Create([description1, description2,...])
    | beam.ParDo(SyntheticSDFAsSource())

  NOTE:
    The SDF.process() will have different param content between defining a DoFn
    and runtime.
    When defining an SDF.process, the restriction_tracker should be a
    `RestrictionProvider`.
    During runtime, the DoFnRunner.process_with_sized_restriction() will feed
    a 'RestrictionTracker' based on a restriction to SDF.process().
  """

  def process(
      self,
      element,
      restriction_tracker=beam.DoFn.RestrictionParam(
          SyntheticSDFSourceRestrictionProvider())):
    for k in range(*restriction_tracker.current_restriction()):
      if not restriction_tracker.try_claim(k):
        return
      r = np.random.RandomState(k)
      time.sleep(element['sleep_per_input_record_sec'])
      yield r.bytes(element['key_size']), r.bytes(element['value_size'])


class ShuffleBarrier(beam.PTransform):

  def expand(self, pc):
    return (pc
            | beam.Map(rotate_key)
            | beam.GroupByKey()
            | 'Ungroup' >> beam.FlatMap(
                lambda elm: [(elm[0], v) for v in elm[1]]))


class SideInputBarrier(beam.PTransform):

  def expand(self, pc):
    return (pc
            | beam.Map(rotate_key)
            | beam.Map(
                lambda elem, ignored: elem,
                beam.pvalue.AsIter(pc | beam.FlatMap(lambda elem: None))))


def merge_using_gbk(name, pc1, pc2):
  """Merges two given PCollections using a CoGroupByKey."""

  pc1_with_key = pc1 | (name + 'AttachKey1') >> beam.Map(lambda x: (x, x))
  pc2_with_key = pc2 | (name + 'AttachKey2') >> beam.Map(lambda x: (x, x))

  grouped = (
      {'pc1': pc1_with_key, 'pc2': pc2_with_key} |
      (name + 'Group') >> beam.CoGroupByKey())
  return (grouped |
          (name + 'DeDup') >> beam.Map(lambda elm: elm[0]))  # Ignoring values


def merge_using_side_input(name, pc1, pc2):
  """Merges two given PCollections using side inputs."""

  def join_fn(val, _):  # Ignoring side input
    return val

  return pc1 | name >> beam.core.Map(join_fn, beam.pvalue.AsIter(pc2))


def expand_using_gbk(name, pc):
  """Expands a given PCollection into two copies using GroupByKey."""

  ret = []
  ret.append((pc | ('%s.a' % name) >> ShuffleBarrier()))
  ret.append((pc | ('%s.b' % name) >> ShuffleBarrier()))
  return ret


def expand_using_second_output(name, pc):
  """Expands a given PCollection into two copies using side outputs."""

  class ExpandFn(beam.DoFn):

    def process(self, element):
      yield beam.pvalue.TaggedOutput('second_out', element)
      yield element

  pc1, pc2 = (pc | name >> beam.ParDo(
      ExpandFn()).with_outputs('second_out', main='main_out'))
  return [pc1, pc2]


def _parse_steps(json_str):
  """Converts the JSON step description into Python objects.

  See property 'steps' for more details about the JSON step description.

  Args:
    json_str: a JSON string that describes the steps.

  Returns:
    Information about steps as a list of dictionaries. Each dictionary may have
    following properties.
    (1) per_element_delay - amount of delay for each element in seconds.
    (2) per_bundle_delay - minimum amount of delay for a given step in seconds.
    (3) output_records_per_input_record - number of output elements generated
        for each input element to a step.
    (4) output_filter_ratio - the probability at which a step may filter out a
        given element by not producing any output for that element.
    (5) splittable - if the step should be splittable.
    (6) initial_splitting_num_bundles - number of bundles initial split if step
        is splittable.
    (7) initial_splitting_uneven_chunks - if the bundles should be
        unevenly-sized
    (8) disable_liquid_sharding - if liquid sharding should be disabled
    (9) size_estimate_override - the size estimate or None to use default
  """
  all_steps = []
  json_data = json.loads(json_str)
  for val in json_data:
    steps = {}
    steps['per_element_delay'] = (
        (float(val['per_element_delay_msec']) / 1000)
        if 'per_element_delay_msec' in val else 0)
    steps['per_bundle_delay'] = (
        float(val['per_bundle_delay_sec'])
        if 'per_bundle_delay_sec' in val else 0)
    steps['output_records_per_input_record'] = (
        int(val['output_records_per_input_record'])
        if 'output_records_per_input_record' in val else 1)
    steps['output_filter_ratio'] = (
        float(val['output_filter_ratio'])
        if 'output_filter_ratio' in val else 0)
    steps['splittable'] = (
        bool(val['splittable'])
        if 'splittable' in val else False)
    steps['initial_splitting_num_bundles'] = (
        int(val['initial_splitting_num_bundles'])
        if 'initial_splitting_num_bundles' in val else 8)
    steps['initial_splitting_uneven_chunks'] = (
        bool(val['initial_splitting_uneven_chunks'])
        if 'initial_splitting_uneven_chunks' in val else False)
    steps['disable_liquid_sharding'] = (
        bool(val['disable_liquid_sharding'])
        if 'disable_liquid_sharding' in val else False)
    steps['size_estimate_override'] = (
        int(val['size_estimate_override'])
        if 'size_estimate_override' in val else None)
    all_steps.append(steps)

  return all_steps


def parse_args(args):
  """Parses a given set of arguments.

  Args:
    args: set of arguments to be passed.

  Returns:
    a tuple where first item gives the set of arguments defined and parsed
    within this method and second item gives the set of unknown arguments.
  """

  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--steps',
      dest='steps',
      type=_parse_steps,
      help='A JSON string that gives a list where each entry of the list is '
           'configuration information for a step. Configuration for each step '
           'consists of '
           '(1) A float "per_bundle_delay_sec" (in seconds). Defaults to 0.'
           '(2) A float "per_element_delay_msec" (in milli seconds). '
           '    Defaults to 0.'
           '(3) An integer "output_records_per_input_record". Defaults to 1.'
           '(4) A float "output_filter_ratio" in the range [0, 1] . '
           '    Defaults to 0.'
           '(5) A bool "splittable" that defaults to false.'
           '(6) An integer "initial_splitting_num_bundles". Defaults to 8.')

  parser.add_argument(
      '--input',
      dest='input',
      type=json.loads,
      help='A JSON string that describes the properties of the SyntheticSource '
           'used by the pipeline. Configuration is similar to Java '
           'SyntheticBoundedInput.'
           'Currently supports following properties. '
           '(1) An integer "numRecords". '
           '(2) An integer "keySize". '
           '(3) An integer "valueSize". '
           '(4) A tuple "bundleSizeDistribution" with following values. '
           '    A string "type". Allowed values are "const" and "zipf". '
           '    An float "param". Only used if "type"=="zipf". Must be '
           '    larger than 1. '
           '(5) An integer "forceNumInitialBundles". '
           '(6) An integer "splitPointFrequencyRecords". '
           '(7) A tuple "delayDistribution" with following values. '
           '    A string "type". Only allowed value is "const". '
           '    An integer "const". ')

  parser.add_argument('--barrier',
                      dest='barrier',
                      default='shuffle',
                      choices=['shuffle', 'side-input', 'expand-gbk',
                               'expand-second-output', 'merge-gbk',
                               'merge-side-input'],
                      help='Whether to use shuffle as the barrier '
                           '(as opposed to side inputs).')
  parser.add_argument('--output',
                      dest='output',
                      default='',
                      help='Destination to write output.')

  return parser.parse_known_args(args)


def run(argv=None):
  """Runs the workflow."""
  known_args, pipeline_args = parse_args(argv)

  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True

  input_info = known_args.input

  with TestPipeline(options=pipeline_options) as p:
    source = SyntheticSource(input_info)

    # pylint: disable=expression-not-assigned
    barrier = known_args.barrier

    pc_list = []
    num_roots = 2 ** (len(known_args.steps) - 1) if (
        barrier == 'merge-gbk' or barrier == 'merge-side-input') else 1
    for read_no in range(num_roots):
      pc_list.append((p | ('Read %d' % read_no) >> beam.io.Read(source)))

    for step_no, steps in enumerate(known_args.steps):
      if step_no != 0:
        new_pc_list = []
        for pc_no, pc in enumerate(pc_list):
          if barrier == 'shuffle':
            new_pc_list.append(
                (pc |
                 ('shuffle %d.%d' % (step_no, pc_no)) >> ShuffleBarrier()))
          elif barrier == 'side-input':
            new_pc_list.append(
                (pc |
                 ('side-input %d.%d' % (step_no, pc_no)) >> SideInputBarrier()))
          elif barrier == 'expand-gbk':
            new_pc_list.extend(
                expand_using_gbk(('expand-gbk %d.%d' % (step_no, pc_no)), pc))
          elif barrier == 'expand-second-output':
            new_pc_list.extend(
                expand_using_second_output(
                    ('expand-second-output %d.%d' % (step_no, pc_no)), pc))
          elif barrier == 'merge-gbk':
            if pc_no % 2 == 0:
              new_pc_list.append(
                  merge_using_gbk(('merge-gbk %d.%d' % (step_no, pc_no)),
                                  pc, pc_list[pc_no + 1]))
            else:
              continue
          elif barrier == 'merge-side-input':
            if pc_no % 2 == 0:
              new_pc_list.append(
                  merge_using_side_input(
                      ('merge-side-input %d.%d' % (step_no, pc_no)),
                      pc, pc_list[pc_no + 1]))
            else:
              continue

        pc_list = new_pc_list

      new_pc_list = []
      for pc_no, pc in enumerate(pc_list):
        if steps['splittable']:
          step = get_synthetic_sdf_step(
              per_element_delay_sec=steps['per_element_delay'],
              per_bundle_delay_sec=steps['per_bundle_delay'],
              output_records_per_input_record=
              steps['output_records_per_input_record'],
              output_filter_ratio=steps['output_filter_ratio'],
              initial_splitting_num_bundles=
              steps['initial_splitting_num_bundles'],
              initial_splitting_uneven_chunks=
              steps['initial_splitting_uneven_chunks'],
              disable_liquid_sharding=steps['disable_liquid_sharding'],
              size_estimate_override=steps['size_estimate_override'])
        else:
          step = SyntheticStep(
              per_element_delay_sec=steps['per_element_delay'],
              per_bundle_delay_sec=steps['per_bundle_delay'],
              output_records_per_input_record=
              steps['output_records_per_input_record'],
              output_filter_ratio=steps['output_filter_ratio'])
        new_pc = pc | 'SyntheticStep %d.%d' % (
            step_no, pc_no) >> beam.ParDo(step)
        new_pc_list.append(new_pc)
      pc_list = new_pc_list

    if known_args.output:
      # If an output location is provided we format and write output.
      if len(pc_list) == 1:
        (pc_list[0] |
         'FormatOutput' >> beam.Map(lambda elm: (elm[0] + elm[1])) |
         'WriteOutput' >> WriteToText(known_args.output))

  logging.info('Pipeline run completed.')


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()
